{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ETL test etl process\n",
    "> when you want to get `test`(sample) data to quickly test your ETL process, or need `data from a certain point` to test your ETL process, you can check how to do it here."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Get `test`(sample) data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 get `test`(sample) data `w/o config`\n",
    "> when you have created a ETL process and don't wanna set config from the scratch here is a quick way to get the sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/11/14 19:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total data # : 100\n",
      "sample data :\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'id': 'e2ce9284-8691-471b-88e3-ba29a5888fd1',\n",
       "  'name': 'test_fake_ufl',\n",
       "  'text': 'Simple toward doctor any. Rich name reality bad family. Gas mind even important stay describe official.\\nThere recognize campaign wind on. Drop sport however central read.',\n",
       "  'meta': '{\"name\": \"Amanda Ross\", \"age\": 60, \"address\": \"302 Rebecca Camp\\\\nPatrickborough, CT 40755\", \"job\": \"Broadcast engineer\"}'}]"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "spark, data = etl_pipeline.sample()\n",
    "\n",
    "# default sampling will return 100 `ufl` data\n",
    "print(f\"total data # : {data.count()}\")\n",
    "print(f\"sample data :\")\n",
    "data.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "when you want to increase the sample size do the following\n",
    "```python\n",
    "spark, data = etl_pipeline.sample(n=10000)\n",
    "spark, data = etl_pipeline.sample(10000)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total data # : 10000\n",
      "sample data :\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'id': '79081a73-5c82-432d-bf4a-f7de8bf59d12',\n",
       "  'name': 'test_fake_ufl',\n",
       "  'text': 'Serious teacher follow they entire between. Far see issue view throughout order field.\\nWant senior sell amount picture. Tree cell low edge.',\n",
       "  'meta': '{\"name\": \"Jack Yoder\", \"age\": 75, \"address\": \"083 Diana Parkway Suite 438\\\\nLake Amberport, AS 76996\", \"job\": \"Haematologist\"}'}]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark, data = etl_pipeline.sample(10000)\n",
    "print(f\"total data # : {data.count()}\")\n",
    "print(f\"sample data :\")\n",
    "data.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 🌠 get `test`(sample) data `w/ config`\n",
    "> this might took some time to get the data but you can choose your own data\n",
    "- this was also introduced in `ETL_03_create_new_etl_process.ipynb`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Getting sample data `you want`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "spark:\n",
      "  appname: ETL\n",
      "  driver:\n",
      "    memory: 16g\n",
      "etl:\n",
      "- name: data_ingestion___huggingface___hf2raw\n",
      "  args:\n",
      "    name_or_path:\n",
      "    - ai2_arc\n",
      "    - ARC-Challenge\n",
      "- name: utils___sampling___random\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from omegaconf import OmegaConf\n",
    "\n",
    "# load from dict\n",
    "ETL_config = OmegaConf.create({\n",
    "    'spark': {\n",
    "        'appname': 'ETL',\n",
    "        'driver': {'memory': '16g'},\n",
    "    },\n",
    "    'etl': [\n",
    "        {\n",
    "            'name': 'data_ingestion___huggingface___hf2raw',\n",
    "            'args': {'name_or_path': ['ai2_arc', 'ARC-Challenge']}\n",
    "        },\n",
    "        {'name': 'utils___sampling___random'}\n",
    "    ]\n",
    "})\n",
    "\n",
    "print(OmegaConf.to_yaml(ETL_config))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/11/14 19:38:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Found cached dataset ai2_arc (/root/.cache/huggingface/datasets/ai2_arc/ARC-Challenge/1.0.0/1569c2591ea2683779581d9fb467203d9aa95543bb9b75dcfde5da92529fd7f6)\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "efc259f86fec4f76a1165f661ebf13d2",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "  0%|          | 0/3 [00:00<?, ?it/s]"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total data # : 280\n",
      "sample data :\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'id': 'Mercury_7029645',\n",
       "  'question': 'Metal atoms will most likely form ions by the',\n",
       "  'choices': Row(text=['loss of electrons.', 'loss of protons.', 'gain of electrons.', 'gain of protons.'], label=['A', 'B', 'C', 'D']),\n",
       "  'answerKey': 'A'}]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "spark, data = etl_pipeline.run(ETL_config)\n",
    "print(f\"total data # : {data.count()}\")\n",
    "print(f\"sample data :\")\n",
    "data.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Test your ETL process\n",
    "> its time to test your ETL process with the sample data. define ETL process and run it"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "23/11/14 19:38:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "total data # : 100\n",
      "sample data :\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[{'id': 'eec9b075-b786-454c-a398-f69d8cf39739',\n",
       "  'name': 'test_fake_ufl',\n",
       "  'text': 'Country toward ago old right.\\nNewspaper hotel although short. Hair actually building.\\nWe build then blue hundred perform wall.',\n",
       "  'meta': '{\"name\": \"Michael Aguirre\", \"age\": 18, \"address\": \"8324 Jennings Road Apt. 378\\\\nLatoyahaven, MT 27716\", \"job\": \"Television camera operator\"}'}]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dataverse.etl import ETLPipeline\n",
    "from dataverse.etl import register_etl\n",
    "\n",
    "etl_pipeline = ETLPipeline()\n",
    "\n",
    "# get sample data\n",
    "spark, data = etl_pipeline.sample()\n",
    "print(f\"total data # : {data.count()}\")\n",
    "print(f\"sample data :\")\n",
    "data.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "@register_etl\n",
    "def test___your___etl_process(spark, data, *args, **kwargs):\n",
    "    # add your custom process here\n",
    "    # here we are going to simply remove 'id' key\n",
    "    data = data.map(lambda x: {k: v for k, v in x.items() if k != 'id'})\n",
    "\n",
    "    return data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'name': 'test_fake_ufl',\n",
       "  'text': 'Country toward ago old right.\\nNewspaper hotel although short. Hair actually building.\\nWe build then blue hundred perform wall.',\n",
       "  'meta': '{\"name\": \"Michael Aguirre\", \"age\": 18, \"address\": \"8324 Jennings Road Apt. 378\\\\nLatoyahaven, MT 27716\", \"job\": \"Television camera operator\"}'}]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# test right away\n",
    "# - successfully removed `id` key\n",
    "etl = test___your___etl_process\n",
    "etl()(spark, data).take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'name': 'test_fake_ufl',\n",
       "  'text': 'Country toward ago old right.\\nNewspaper hotel although short. Hair actually building.\\nWe build then blue hundred perform wall.',\n",
       "  'meta': '{\"name\": \"Michael Aguirre\", \"age\": 18, \"address\": \"8324 Jennings Road Apt. 378\\\\nLatoyahaven, MT 27716\", \"job\": \"Television camera operator\"}'}]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# test it is registered by calling it from etl_pipeline\n",
    "# - successfully removed `id` key\n",
    "etl = etl_pipeline.get('test___your___etl_process')\n",
    "etl()(spark, data).take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 🌌 Experiments on the data itself\n",
    "> there is no chosen way to use this `test`(sample) data. you can do whatever you want with it. here are some examples"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[{'id': 'eec9b075-b786-454c-a398-f69d8cf39739',\n",
       "  'name': 'test_fake_ufl',\n",
       "  'text': 'Country toward ago old right.\\nNewspaper hotel although short. Hair actually building.\\nWe build then blue hundred perform wall.',\n",
       "  'meta': '{\"name\": \"Michael Aguirre\", \"age\": 18, \"address\": \"8324 Jennings Road Apt. 378\\\\nLatoyahaven, MT 27716\", \"job\": \"Television camera operator\"}',\n",
       "  'duck': 'is quarking (physics)'}]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data.map(lambda x: {**x, 'duck': 'is quarking (physics)'}).take(1)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.11"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
